<?php

define('CMPP_CONNECT', 0x00000001); // 请求连接
define('CMPP_TERMINATE', 0x00000002); // 终止连接
define('CMPP_TERMINATE_RESP', 0x80000002); // 终止连接应答
define('CMPP_SUBMIT', 0x00000004); // 提交短信
define('CMPP_DELIVER', 0x00000005); // 短信下发
define('CMPP_DELIVER_RESP', 0x80000005); // 下发短信应答
define('CMPP_ACTIVE_TEST', 0x00000008); // 链路检测
define('CMPP_ACTIVE_TEST_RESP', 0x80000008); // 链路检测应答

//cmpp 2.0协议
class CmppServer
{
    protected $clients;
    protected $serv;
    protected $sequence_id = 1;
    protected $redis_client;
    protected $sp_code =922005;

    function run()
    {
        $this->serv = new swoole_server("127.0.0.1", 9502);
        $this->serv->set(array(
            'timeout' => 1, //select and epoll_wait timeout.
            'daemonize' => 1,  //以守护进程执行
            'reactor_num' => 1, //线程数
            'worker_num' => 1, //进程数
            'backlog' => 128, //Listen队列长度
            'max_conn' => 10000,
            'max_request' => 3000,
            'task_max_request'=>3000,
            'dispatch_mode' => 1,
        ));
        $this->serv->on('workerstart', array($this, 'onStart'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('close', array($this, 'onClose'));
        $this->serv->start();
    }

    /**
     * 获得redis连接
     */
    private function redis_conn()
    {
        if (!$this->redis_client) {
            try {
                $this->redis_client = new Redis();
                if (!$this->redis_client->pconnect('127.0.0.1', '6379')) {
                    return false;
                }
                if (!$this->redis_client->auth('12345678')) {
                    return false;
                }

            } catch (\Exception $e) {
                return false;
            }
        }
        return $this->redis_client;
    }

    /*
     * 登录成功后每隔两分钟发一次心跳
     */
    private function CMPP_CONNECT_RESP($pdu){
        $format = "CStatus/a16Auth/CVersion";
        $data = unpack($format, $pdu);
        $status = intval($data['Status']);
        if($status !== 0){
            return false;
        }
        $client = $this->clients['sys']['socket'];
        $header = pack('NNN',12,CMPP_ACTIVE_TEST,$this->sequence_id);
        $client->send($header);
        $this->sequence_id = $this->sequence_id + 1;
        $test = $this->sequence_id;
        $this->serv->tick(120000, function()use ($client, $test){
            $header = pack('NNN',12,CMPP_ACTIVE_TEST,$test);
            $client->send($header);
        });
    }

    /*
     * 运营商收到心跳包后回的响应包
     */
    private function CMPP_ACTIVE_TEST_RESP($data){
        extract(unpack("Nsequence_id", substr($data, 8, 4)));
        $format = "C";
        $body = pack($format, '');
        //消息长度
        $body_len = 13;
        $header = pack('NNN',$body_len,0x80000008,$sequence_id);
        $client = $this->clients['sys']['socket'];
        $data = $header.$body;
        $res = $client->send($data);
        $redis = $this->redis_conn();
        $key = 'active_cmpp_9503';
        $redis->set($key, date('Y-m-d H:i:s'), 150);
    }

    /*
     * 提交短信后运营商回的响应包
     */
    private function CMPP_SUBMIT_RESP($pdu, $send_list_id){
        $format = "JMsg_Id/CResult";
        $data = unpack($format, $pdu);
        $msg_id = $data['Msg_Id'];
        $status = $data['Result'];
        $redis = $this->redis_conn();
        // code_monitor
        $key = 'code_monitor_'.$send_list_id;
        $redis->set($key, 3, 3600*24);

        if($status === 0){ //失败
            $key = 'yanzhengma_'.$this->sp_code.'_'.$msg_id;
            $redis->set($key, $send_list_id, 3600*24);
        }else{ //成功
            $key = 'yanzhengma_send_result_list';
            $array['send_list_id'] = $send_list_id;
            $array['status'] = 0;
            $array['msg_id'] = $msg_id;
            $array['code'] = 'error';
            $array['receive_time'] = date('Y-m-d H:i:s');
            $json = json_encode($array);
            $redis->rpush($key,$json);
        }
    }

    /*
     * 回复运营商推的状态包
     */
    private function SEND_CMPP_DELIVER_RESP($sequence_id, $msg_id, $result){
        $format = "JC";
        $body = pack($format, $msg_id, $result);
        //消息长度
        $body_len = 21;
        $header = pack('NNN',$body_len,CMPP_DELIVER_RESP,$sequence_id);
        $client = $this->clients['sys']['socket'];
        $data = $header.$body;
        $res = $client->send($data);
    }

    /*
     * 运营商推的状态包
     */
    private function CMPP_DELIVER_RESP($data){
        $pdu = substr($data, 12);
        $content_length = strlen($pdu) - 73;
        $format = "JMsg_Id/a21Dest_Id/a10Service_Id/CTP_pid/CTP_udhi/CMsg_Fmt/a21Src_terminal_Id/CRegistered_Delivery/CMsg_Length/a{$content_length}Msg_Content/a8Reserved";
        $unpack_data = unpack($format, $pdu);
        $Msg_Id = $unpack_data['Msg_Id'];
        if($unpack_data['Registered_Delivery'] == 1){ //短信发送状态推送
            $msg_content = $unpack_data['Msg_Content'];
            $format = 'JMsg_Id/a7Stat/a10Submite_time/a10Done_time/a21Dest_terminal_Id/NSMSC_sequence';
            $status_arr = unpack($format, $msg_content);
            $stat = $status_arr['Stat'];
            $status = 0;
            if($stat == 'DELIVRD'){
                $status = 1;
            }
            $msg_id = $status_arr['Msg_Id'];
            $redis = $this->redis_conn();
            $key1 = 'yanzhengma_'.$this->sp_code.'_'.$msg_id;
            $send_list_id = $redis->get($key1);

            // code_monitor
            $key = 'code_monitor_'.$send_list_id;
            $redis->set($key, 4, 3600*24);

            if($send_list_id){
                $array['send_list_id'] = $send_list_id;
                $array['status'] = $status;
                $array['msg_id'] = $msg_id;
                $array['code'] = $stat;
                $array['receive_time'] = date('Y-m-d H:i:s');
                $json = json_encode($array);
                $key = 'yanzhengma_send_result_list';
                $res = $redis->rpush($key,$json);
            }
        }else{ // 回复内容推送
            $mobile = trim($unpack_data['Src_terminal_Id']); //回复的手机号
            $content = mb_convert_encoding($unpack_data['Msg_Content'], 'UTF-8', "UCS-2BE");//回复的内容
        }
        extract(unpack("Nsequence_id", substr($data, 8, 4)));
        $this->SEND_CMPP_DELIVER_RESP($sequence_id, $Msg_Id, 0);
    }

    /*
     * UCS-2BE编码
     */
    private function split_message_unicode($text)
    {
        $max_len = 134;
        $res = array();
        if (mb_strlen($text) <= 140) {
            $res[] = $text;
            return $res;
        }
        $pos = 0;
        $msg_sequence = $this->_message_sequence++;
        $num_messages = ceil(mb_strlen($text) / $max_len);
        $part_no = 1;
        while ($pos < mb_strlen($text)) {
            $ttext = mb_substr($text, $pos, $max_len);
            $pos += mb_strlen($ttext);
            $udh = pack("cccccc", 5, 0, 3, $msg_sequence, $num_messages, $part_no);
            $part_no++;
            $res[] = $udh . $ttext;
        }
        return $res;
    }

    /*
     * 连接
     */
    private function clientStart(){
        $socket = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
        $this->clients['sys'] = array(
            'socket' => $socket,
        );

        $socket->on("connect", function(swoole_client $cli) {
            $username = 922005;
            $password = '123456';
            $user = $username;
            $password = $password;
            $timestamp = date('mdHis');
            $version = 0x20;
            $AuthenticatorSource_ori = $user . pack('a9', '') . $password . $timestamp;
            $AuthenticatorSource = md5($AuthenticatorSource_ori, true);
            $format = "a6a16CN";
            //消息体
            $body = pack($format, $user, $AuthenticatorSource, $version, $timestamp);
            //消息长度
            $body_len =strlen($body)+12;
            //消息头
            $header = pack('NNN',$body_len,CMPP_CONNECT,$this->sequence_id);
            $this->sequence_id = $this->sequence_id;
            $data = $header.$body;
            $cli->send($data);
        });

        $socket->on("receive", function(swoole_client $cli, $data){
            if($data){
                extract(unpack("Nlength/Ncommand_id/Nsequence_number", $data));
                $command_id &= 0x0fffffff;
                $pdu = substr($data, 12);
                switch ($command_id) {
                    case CMPP_CONNECT:
                        $data = $this->CMPP_CONNECT_RESP($pdu);
                        break;
                    case CMPP_SUBMIT:
                       $data = $this->CMPP_SUBMIT_RESP($pdu, $sequence_number);
                       break;
                    case CMPP_DELIVER:
                       $data = $this->CMPP_DELIVER_RESP($data);
                       break;
                    case CMPP_ACTIVE_TEST:
                       $data = $this->CMPP_ACTIVE_TEST_RESP($data);
                       break;
                    default:
                        break;
                }
            }
        });
        $socket->on("error", function(swoole_client $cli){
            echo "error\n";
        });
        $socket->on("close", function(swoole_client $cli){
            echo "Connection close\n";
            $this->clientStart();
        });
        $socket->connect('47.94.96.6', 7890);
    }

    function onStart($serv)
    {
        $this->serv = $serv;
        echo "Server: start.Swoole version is [" . SWOOLE_VERSION . "]\n";
        $this->clientStart();
    }

    function onConnect($serv, $fd, $from_id)
    {
    }
    function onClose($serv, $fd, $from_id)
    {
    }
    function onReceive($serv, $fd, $from_id, $data)
    {
        $temp = json_decode($data, true);
        $mobile = $temp['mobile'];
        $content = $temp['content'];
        $send_list_id = $temp['send_list_id']; //发送总表的ID
        if($mobile && $content && $send_list_id){
            $redis = $this->redis_conn();
            $key = 'code_monitor_'.$send_list_id;
            $redis->set($key, 2, 3600*24);
            $unicode_text = mb_convert_encoding($content, "UCS-2BE", 'UTF-8');
            // 中文要用UCS-2BE编码，要不收到短信是乱码
            $multi_texts = $this->split_message_unicode($unicode_text);
            unset($unicode_text);
            reset($multi_texts);
            list($pos, $part) = each($multi_texts);
            $msg_content = $part;
            $msg_len = strlen($msg_content);//短信长度
            $Msg_Id = 1;
            $Pk_total = 1;
            $Pk_number = 1;
            $Registered_Delivery = 1;
            $Msg_level = 1;
            $Service_Id = 'code1';
            $Fee_UserType = 2;
            $Fee_terminal_Id = '';
            $TP_pId = 0;
            $TP_udhi = 0;
            $Msg_Fmt = 8;
            $Msg_src = 922005;
            $FeeType = '05';
            $FeeCode = '';
            $ValId_Time = '';
            $At_Time = '';
            $Src_Id = 1069032;
            $DestUsr_tl = 1;
            $Dest_terminal_Id = $mobile;
            $Msg_Length = $msg_len;
            $Msg_Content = $msg_content;
            $Reserve = '';
            $format = "a8CCCCa10Ca21CCCa6a2a6a17a17a21Ca21Ca{$msg_len}a8";
            $body = pack($format, $Msg_Id, $Pk_total, $Pk_number, $Registered_Delivery,$Msg_level,$Service_Id,$Fee_UserType,$Fee_terminal_Id,$TP_pId,$TP_udhi,$Msg_Fmt,$Msg_src,$FeeType,$FeeCode,$ValId_Time,$At_Time,$Src_Id,$DestUsr_tl,$Dest_terminal_Id,$Msg_Length,$Msg_Content,$Reserve);
            //消息长度
            $body_len =strlen($body)+12;
            //消息头
            $header = pack('NNN',$body_len,CMPP_SUBMIT,$send_list_id);
            $data = $header.$body;
            $socket = $this->clients['sys']['socket'];
            $socket->send($data);
        }
    }
}

$serv = new CmppServer();
$serv->run();
